Load the airport and flight data from Cloudant


In [ ]:
cloudantHost='dtaieb.cloudant.com'
cloudantUserName='weenesserliffircedinvers'
cloudantPassword='72a5c4f939a9e2578698029d2bb041d775d088b5'

In [ ]:
airports = sqlContext.read.format("com.cloudant.spark").option("cloudant.host",cloudantHost)\
    .option("cloudant.username",cloudantUserName).option("cloudant.password",cloudantPassword)\
    .option("schemaSampleSize", "-1").load("flight-metadata")
airports.cache()
airports.registerTempTable("airports")

In [ ]:
import pixiedust

# Display the airports data
display(airports)

In [ ]:
flights = sqlContext.read.format("com.cloudant.spark").option("cloudant.host",cloudantHost)\
    .option("cloudant.username",cloudantUserName).option("cloudant.password",cloudantPassword)\
    .option("schemaSampleSize", "-1").load("pycon_flightpredict_training_set")
flights.cache()
flights.registerTempTable("training")

In [ ]:
# Display the flights data
display(flights)

Build the vertices and edges dataframe from the data


In [ ]:
from pyspark.sql import functions as f
from pyspark.sql.types import *

rdd = flights.rdd.flatMap(lambda s: [s.arrivalAirportFsCode, s.departureAirportFsCode]).distinct()\
    .map(lambda row:[row])
vertices = airports.join(
      sqlContext.createDataFrame(rdd, StructType([StructField("fs",StringType())])), "fs"
    ).dropDuplicates(["fs"]).withColumnRenamed("fs","id")

print(vertices.count())

In [ ]:
edges = flights.withColumnRenamed("arrivalAirportFsCode","dst")\
    .withColumnRenamed("departureAirportFsCode","src")\
    .drop("departureWeather").drop("arrivalWeather").drop("pt_type").drop("_id").drop("_rev")

print(edges.count())

Install GraphFrames package using PixieDust packageManager

The GraphFrames package to install depends on the environment.

Spark 1.6

  • graphframes:graphframes:0.5.0-spark1.6-s_2.11

Spark 2.x

  • graphframes:graphframes:0.5.0-spark2.1-s_2.11

In addition, recent versions of graphframes have dependencies on other packages which will need to also be installed:

  • com.typesafe.scala-logging:scala-logging-api_2.11:2.1.2
  • com.typesafe.scala-logging:scala-logging-slf4j_2.11:2.1.2

Note: After installing packages, the kernel will need to be restarted and all the previous cells re-run (including the install package cell).


In [ ]:
import pixiedust

if sc.version.startswith('1.6.'):  # Spark 1.6
    pixiedust.installPackage("graphframes:graphframes:0.5.0-spark1.6-s_2.11")
elif sc.version.startswith('2.'):  # Spark 2.1, 2.0
    pixiedust.installPackage("graphframes:graphframes:0.5.0-spark2.1-s_2.11")


pixiedust.installPackage("com.typesafe.scala-logging:scala-logging-api_2.11:2.1.2")
pixiedust.installPackage("com.typesafe.scala-logging:scala-logging-slf4j_2.11:2.1.2")

print("done")

Create the GraphFrame from the Vertices and Edges Dataframes


In [ ]:
from graphframes import GraphFrame
g = GraphFrame(vertices, edges)
display(g)

Compute the degree for each vertex in the graph

The degree of a vertex is the number of edges incident to the vertex. In a directed graph, in-degree is the number of edges where vertex is the destination and out-degree is the number of edges where the vertex is the source. With GraphFrames, there is a degrees, outDegrees and inDegrees property that return a DataFrame containing the id of the vertext and the number of edges. We then sort then in descending order


In [ ]:
from pyspark.sql.functions import *
degrees = g.degrees.sort(desc("degree"))
display( degrees )

Compute a list of shortest paths for each vertex to a specified list of landmarks

For this we use the shortestPaths api that returns DataFrame containing the properties for each vertex plus an extra column called distances that contains the number of hops to each landmark. In the following code, we use BOS and LAX as the landmarks


In [ ]:
r = g.shortestPaths(landmarks=["BOS", "LAX"]).select("id", "distances")
display(r)

Compute the pageRank for each vertex in the graph

PageRank is a famous algorithm used by Google Search to rank vertices in a graph by order of importance. To compute pageRank, we'll use the pageRank api that returns a new graph in which the vertices have a new pagerank column representing the pagerank score for the vertex and the edges have a new weight column representing the edge weight that contributed to the pageRank score. We'll then display the vertice ids and associated pageranks sorted descending:


In [ ]:
from pyspark.sql.functions import *

ranks = g.pageRank(resetProbability=0.20, maxIter=5)

rankedVertices = ranks.vertices.select("id","pagerank").orderBy(desc("pagerank"))
rankedEdges = ranks.edges.select("src", "dst", "weight").orderBy(desc("weight") )

ranks = GraphFrame(rankedVertices, rankedEdges)
display(ranks)

Search routes between 2 airports with specific criteria

In this section, we want to find all the routes between Boston and San Francisco operated by United Airlines with at most 2 hops. To accomplish this, we use the bfs (Breath First Search) api that returns a DataFrame containing the shortest path between matching vertices. For clarity will only keep the edge when displaying the results


In [ ]:
paths = g.bfs(fromExpr="id='BOS'",toExpr="id = 'SFO'",edgeFilter="carrierFsCode='UA'", maxPathLength = 2)\
    .drop("from").drop("to")
paths.cache()
display(paths)

Find all airports that do not have direct flights between each other

In this section, we'll use a very powerful graphFrames search feature that uses a pattern called motif to find nodes. The pattern we'll use the following pattern "(a)-[]->(b);(b)-[]->(c);!(a)-[]->(c)" which searches for all nodes a, b and c that have a path to (a,b) and a path to (b,c) but not a path to (a,c). Also, because the search is computationally expensive, we reduce the number of edges by grouping the flights that have the same src and dst.


In [ ]:
from pyspark.sql.functions import *

h = GraphFrame(g.vertices, g.edges.select("src","dst")\
   .groupBy("src","dst").agg(count("src").alias("count")))

query = h.find("(a)-[]->(b);(b)-[]->(c);!(a)-[]->(c)").drop("b")
query.cache()
display(query)

Compute the strongly connected components for this graph

Strongly Connected Components are components for which each vertex is reachable from every other vertex. To compute them, we'll use the stronglyConnectedComponents api that returns a DataFrame containing all the vertices with the addition of a component column that has the component id in which the vertex belongs to. We then group all the rows by components and aggregate the sum of all the member vertices. This gives us a good idea of the components distribution in the graph


In [ ]:
from pyspark.sql.functions import *
components = g.stronglyConnectedComponents(maxIter=10).select("id","component")\
    .groupBy("component").agg(count("id").alias("count")).orderBy(desc("count"))
display(components)

Detect communities in the graph using Label Propagation algorithm

Label Propagation algorithm is a popular algorithm for finding communities within a graph. It has the advantage to be computationally inexpensive and thus works well with large graphs. To compute the communities, we'll use the labelPropagation api that returns a DataFrame containing all the vertices with the addition of a label column that has the label id for the communities in which the vertex belongs to. Similar to the strongly connected components, we'll then group all the rows by label and aggregate the sum of all the member vertices.


In [ ]:
from pyspark.sql.functions import *
communities = g.labelPropagation(maxIter=5).select("id", "label")\
    .groupBy("label").agg(count("id").alias("count")).orderBy(desc("count"))
display(communities)

Use AggregateMessages to compute the average flight delays by originating airport

AggregateMessages api is not currently available in Python, so we use PixieDust Scala bridge to call out the Scala API Note: Notice that PixieDust is automatically rebinding the python GraphFrame variable g into a scala GraphFrame with same name


In [ ]:
%%scala
import org.graphframes.lib.AggregateMessages
import org.apache.spark.sql.functions.{avg,desc,floor}

// For each airport, average the delays of the departing flights
val msgToSrc = AggregateMessages.edge("deltaDeparture")
val __agg = g.aggregateMessages
  .sendToSrc(msgToSrc)  // send each flight delay to source
  .agg(floor(avg(AggregateMessages.msg)).as("averageDelays"))  // average up all delays
  .orderBy(desc("averageDelays"))
  .limit(10)
__agg.cache()
__agg.show()

In [ ]:
display(__agg)

In [ ]: